{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# wide-deep模型架构"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**wide-deep**模型是业界点击率预估最常用的模型，这个模型的参数主要由两部分组成：\n",
    "1. sparse参数。\n",
    "    在下图中由`Sparse Features`标识的区域全部为sparse特征。\n",
    "2. dense参数。\n",
    "    除了sparse的特征之外，都可以视作dense的特征。\n",
    "    \n",
    "一般的，在真实场景中`Sparse Features`中包含几十或几百个`feature`（下图`Sparse Features`部分每一个圆圈即为一个`feature`），比如`广告id`、`用户id`、`用户所在的城市`等等都属于一个`feature`，通常每个`feature`对应不同的维度，从几十到几亿不等，比如`用户id`往往会大于一亿维（有一亿个用户），`用户所在的城市`不到1000维。当`feature`比较多的时候要从这些高维的`feature`中获取参数成了训练这种模型的极大挑战。"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<img src=\"pic/wide-deep.png\"/>[image](./pic/wide-deep.png)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## slot"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "在TensorNet中将上面模型中的`feature`统一抽象称为`slot`，每一个`feature`即为一个`slot`，但构造复杂模型的时候每个`slot`可以不局限为一个`feature`。"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## sign"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "对于每个`feature`的每个维度可以用一个字符串或数字表示，比如`城市`有`北京`、`天津`等等，每一个`广告id`都是一个int64的值。TensorNet要求这些值要全部哈希成一个uint64的值，我们称之为`sign`。"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## TensorFlow原生版的wide-deep模型"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "import tensorflow as tf\n",
    "TEST_DATA_PATH = '/tmp/wide-deep-test/data'\n",
    "!mkdir -p $TEST_DATA_PATH"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 生成测试数据"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "下面脚本生成两个`part`的数据，每个`part`有12000个样本，每个样本都有4个`feature`。每个`feature`维度为65536维"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "import random\n",
    "\n",
    "def serialize_example(slots):\n",
    "    fea_desc = {}\n",
    "\n",
    "    label = random.choice([1, 0])\n",
    "    fea_desc[\"label\"] = tf.train.Feature(int64_list=tf.train.Int64List(value=[label]))\n",
    "\n",
    "    for slot in slots:\n",
    "        values = [random.randint(1, 2**16)]\n",
    "        fea_desc[slot] = tf.train.Feature(int64_list=tf.train.Int64List(value=values))\n",
    "\n",
    "    example_proto = tf.train.Example(features=tf.train.Features(feature=fea_desc))\n",
    "\n",
    "    return example_proto.SerializeToString()\n",
    "\n",
    "def generate_data(name):\n",
    "    slots = [\"1\", \"2\", \"3\", \"4\"]\n",
    "    count = 12000\n",
    "\n",
    "    with tf.io.TFRecordWriter(TEST_DATA_PATH + \"/tf-part.%s\" % name) as writer:\n",
    "        for i in range(count):\n",
    "            example = serialize_example(slots)\n",
    "            writer.write(example)\n",
    "\n",
    "generate_data('00001')\n",
    "generate_data('00002')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "total 2088\r\n",
      "-rw-rw-r-- 1 zhangyansheng zhangyansheng 1067871 Sep  8 10:02 tf-part.00001\r\n",
      "-rw-rw-r-- 1 zhangyansheng zhangyansheng 1067927 Sep  8 10:02 tf-part.00002\r\n"
     ]
    }
   ],
   "source": [
    "!ls -l $TEST_DATA_PATH"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 编写模型 "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [],
   "source": [
    "class Config(object):\n",
    "    FILE_MATCH_PATTERN = \"tf-*\"\n",
    "    BATCH_SIZE = 32\n",
    "    DEEP_HIDDEN_UNITS = [512, 256, 256]\n",
    "\n",
    "    WIDE_SLOTS = [ \"1\",\"2\",\"3\",\"4\"]\n",
    "    DEEP_SLOTS = [ \"1\",\"2\",\"3\",\"4\"]\n",
    "    \n",
    "C = Config"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "下面代码通过TensorFlow的dataset读取数据。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "\n",
    "def parse_line_batch(example_proto):\n",
    "    fea_desc = {\n",
    "        \"label\": tf.io.FixedLenFeature([], tf.int64)\n",
    "    }\n",
    "\n",
    "    for slot in set(C.WIDE_SLOTS + C.DEEP_SLOTS):\n",
    "        fea_desc[slot]  = tf.io.VarLenFeature(tf.int64)\n",
    "\n",
    "    feature_dict = tf.io.parse_example(example_proto, fea_desc)\n",
    "\n",
    "    label = feature_dict.pop('label')\n",
    "    return feature_dict, label\n",
    "\n",
    "def read_dataset(data_path, match_pattern):\n",
    "    data_file = os.path.join(data_path, match_pattern)\n",
    "    dataset = tf.data.Dataset.list_files(data_file)\n",
    "\n",
    "    dataset = dataset.interleave(lambda f: tf.data.TFRecordDataset(f, buffer_size=1024 * 100))\n",
    "    dataset = dataset.batch(C.BATCH_SIZE)\n",
    "    dataset = dataset.map(map_func=parse_line_batch)\n",
    "    dataset = dataset.prefetch(tf.data.experimental.AUTOTUNE)\n",
    "\n",
    "    return dataset"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "下面代码使用TensorFlow的feature_column构造模型，wide部分每个`feature`的`embedding_size`为1，deep部分每个`feature`的`embedding_size`为8。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [],
   "source": [
    "def columns_builder():\n",
    "    column_1 = tf.feature_column.categorical_column_with_hash_bucket(\n",
    "      '1', hash_bucket_size = 2**16, dtype=tf.int64)\n",
    "    column_2 = tf.feature_column.categorical_column_with_hash_bucket(\n",
    "      '2', hash_bucket_size = 2**16, dtype=tf.int64)\n",
    "    column_3 = tf.feature_column.categorical_column_with_hash_bucket(\n",
    "      '3', hash_bucket_size = 2**16, dtype=tf.int64)\n",
    "    column_4 = tf.feature_column.categorical_column_with_hash_bucket(\n",
    "      '4', hash_bucket_size = 2**16, dtype=tf.int64)\n",
    "    \n",
    "    wide_columns = [\n",
    "        tf.feature_column.embedding_column(column_1, dimension=1),\n",
    "        tf.feature_column.embedding_column(column_2, dimension=1),\n",
    "        tf.feature_column.embedding_column(column_3, dimension=1),\n",
    "        tf.feature_column.embedding_column(column_4, dimension=1),\n",
    "    ]\n",
    "    \n",
    "    deep_columns = [\n",
    "        tf.feature_column.embedding_column(column_1, dimension=8),\n",
    "        tf.feature_column.embedding_column(column_2, dimension=8),\n",
    "        tf.feature_column.embedding_column(column_3, dimension=8),\n",
    "        tf.feature_column.embedding_column(column_4, dimension=8),\n",
    "    ]\n",
    "\n",
    "    return wide_columns, deep_columns\n",
    "\n",
    "def create_model(wide_columns, deep_columns):\n",
    "    wide, deep = None, None\n",
    "\n",
    "    inputs = {}\n",
    "    for slot in set(C.WIDE_SLOTS + C.DEEP_SLOTS):\n",
    "        inputs[slot] = tf.keras.layers.Input(name=slot, shape=(None,), dtype=\"int64\", sparse=True)\n",
    "\n",
    "    if wide_columns:\n",
    "        wide = tf.keras.layers.DenseFeatures(wide_columns, name='wide_inputs')(inputs)\n",
    "\n",
    "    if deep_columns:\n",
    "        deep = tf.keras.layers.DenseFeatures(deep_columns, name='deep_inputs')(inputs)\n",
    "\n",
    "        for i, unit in enumerate(C.DEEP_HIDDEN_UNITS):\n",
    "            deep = tf.keras.layers.Dense(unit, activation='relu', name='dnn_{}'.format(i))(deep)\n",
    "\n",
    "    if wide_columns and not deep_columns:\n",
    "        output = tf.keras.layers.Dense(1, activation='sigmoid', name='pred')(wide)\n",
    "    elif deep_columns and not wide_columns:\n",
    "        output = tf.keras.layers.Dense(1, activation='sigmoid', name='pred')(deep)\n",
    "    else:\n",
    "        both = tf.keras.layers.concatenate([deep, wide], name='both')\n",
    "        output = tf.keras.layers.Dense(1, activation='sigmoid', name='pred')(both)\n",
    "\n",
    "    model = tf.keras.Model(inputs, output)\n",
    "\n",
    "    model.compile(optimizer='adam',\n",
    "                  loss='binary_crossentropy',\n",
    "                  metrics=['accuracy', tf.keras.metrics.AUC(),])\n",
    "\n",
    "    return model"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [],
   "source": [
    "def train(strategy):\n",
    "    with strategy.scope():\n",
    "        wide_column, deep_column = columns_builder()\n",
    "        model = create_model(wide_column, deep_column)\n",
    "\n",
    "        train_dataset = read_dataset(TEST_DATA_PATH, C.FILE_MATCH_PATTERN)\n",
    "        model.fit(train_dataset, epochs=1, verbose=1)\n",
    "\n",
    "    return"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "750/750 [==============================] - 10s 14ms/step - loss: 0.6953 - accuracy: 0.4954 - auc: 0.4994\n"
     ]
    }
   ],
   "source": [
    "train(tf.distribute.get_strategy())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## TensorFlow原生模型存在的问题"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "上面的代码在定义feature_column的时候实际上TensorFlow会定一个shape为(65535, 8)的矩阵，当其中配置的`hash_bucket_size`越大的时候这个矩阵越大。\n",
    "```\n",
    "    column_1 = tf.feature_column.categorical_column_with_hash_bucket(\n",
    "      '1', hash_bucket_size = 2**16, dtype=tf.int64)\n",
    "    tf.feature_column.embedding_column(column_1, dimension=8),\n",
    "```\n",
    "很显然：\n",
    "   当一个`feature`维度上亿的时候，这个矩阵所占用的内存空间会很大，当特征多的时候就可能面临单机内存不够的情况。\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 如何分布式训练？"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "上面代码中面临的最重要的一个问题是分布式训练，tensorflow提供了多种分布式训练的模式，见：https://www.tensorflow.org/guide/distributed_training 。自TensorFlow 2.0之后的版本对ParameterServer的支持较少，对多机多卡同步训练支持的较好。对于上面代码很容易切换到多机多卡的模式训练，只需要更改下面代码，然后通过`TF_CONFIG`配置`worker`节点的地址即可。\n",
    "```\n",
    "    strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()\n",
    "```"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 使用TensorNet训练wide-deep模型"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "TensorNet可以完美避免上面提到的矩阵过大的问题，使特征维度支持到近无限，我们通过下面的例子感受一下。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [],
   "source": [
    "import sys\n",
    "sys.path.append('/da2/zhangyansheng/tensornet') # 在此设置您的tensornet包的位置"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [],
   "source": [
    "import tensornet as tn\n",
    "\n",
    "def columns_builder():\n",
    "    columns = {}\n",
    "    for slot in set(C.WIDE_SLOTS + C.DEEP_SLOTS):\n",
    "        columns[slot] = tn.feature_column.category_column(key=slot)\n",
    "\n",
    "    wide_columns = []\n",
    "    for slot in C.WIDE_SLOTS:\n",
    "        feature_column = tf.feature_column.embedding_column(columns[slot], dimension=1)\n",
    "        wide_columns.append(feature_column)\n",
    "\n",
    "    deep_columns = []\n",
    "    for slot in C.DEEP_SLOTS:\n",
    "        feature_column = tf.feature_column.embedding_column(columns[slot], dimension=8)\n",
    "        deep_columns.append(feature_column)\n",
    "\n",
    "    return wide_columns, deep_columns\n",
    "\n",
    "def create_model(wide_columns, deep_columns):\n",
    "    wide, deep = None, None\n",
    "\n",
    "    inputs = {}\n",
    "    for slot in set(C.WIDE_SLOTS + C.DEEP_SLOTS):\n",
    "        inputs[slot] = tf.keras.layers.Input(name=slot, shape=(None,), dtype=\"int64\", sparse=True)\n",
    "\n",
    "    sparse_opt = tn.core.AdaGrad(learning_rate=0.01, initial_g2sum=0.1, initial_scale=0.1)\n",
    "\n",
    "    if wide_columns:\n",
    "        wide = tn.layers.EmbeddingFeatures(wide_columns, sparse_opt, name='wide_inputs', is_concat=True)(inputs)\n",
    "\n",
    "    if deep_columns:\n",
    "        deep = tn.layers.EmbeddingFeatures(deep_columns, sparse_opt, name='deep_inputs', is_concat=True)(inputs)\n",
    "\n",
    "        for i, unit in enumerate(C.DEEP_HIDDEN_UNITS):\n",
    "            deep = tf.keras.layers.Dense(unit, activation='relu', name='dnn_{}'.format(i))(deep)\n",
    "\n",
    "    if wide_columns and not deep_columns:\n",
    "        output = tf.keras.layers.Dense(1, activation='sigmoid', name='pred')(wide)\n",
    "    elif deep_columns and not wide_columns:\n",
    "        output = tf.keras.layers.Dense(1, activation='sigmoid', name='pred')(deep)\n",
    "    else:\n",
    "        both = tf.keras.layers.concatenate([deep, wide], name='both')\n",
    "        output = tf.keras.layers.Dense(1, activation='sigmoid', name='pred')(both)\n",
    "\n",
    "    model = tn.model.Model(inputs, output)\n",
    "\n",
    "    dense_opt = tn.core.Adam(learning_rate=0.001, beta1=0.9, beta2=0.999, epsilon=1e-8)\n",
    "    model.compile(optimizer=tn.optimizer.Optimizer(dense_opt),\n",
    "                  loss='binary_crossentropy',\n",
    "                  metrics=['accuracy', tf.keras.metrics.AUC(),])\n",
    "\n",
    "    return model"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "750/750 [==============================] - 6s 8ms/step - loss: 0.6933 - accuracy: 0.5000 - auc_1: 0.5026\n"
     ]
    }
   ],
   "source": [
    "train(tn.distribute.PsStrategy())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### TensorNet的改动"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "从上面的例子可以看出，TensorNet版的wide-deep相较于TensorFlow原生版的主要由如下**5**个改动：\n",
    "1. 分布式训练`strategy`改为了`tn.distribute.PsStrategy()`。 \n",
    "2. 将sparse特征的feature column统一使用`tn.feature_column.category_column`适配。\n",
    "3. 将模型的第一层统一使用`tn.layers.EmbeddingFeatures`替换。\n",
    "4. 将`tf.keras.Model`切换为`tn.model.Model。\n",
    "5. 将optimizer切换为`tn.optimizer.Optimizer`。"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### TensorNet的提升"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "我们可以看出：\n",
    "1. TensorFlow原生版中feature column所需要的`hash_bucket_size`在TensorNet中没有了，我们可以支持到特征的维度扩展到了2\\**64。\n",
    "2. 不用在担心embedding tensor过大的问题。实际上tensornet内部始终使用一个较小的tensor存储当前batch所需要的特征，极大的提升了性能。\n",
    "2. 对于模型结构相对原生版破坏较少。只需要更改模型的第一层，其它结构完全可以不变。"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### TensorNet分布式训练"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "如果您将上面的代码统一拷贝到`wide_deep.py`脚本中，那么便可以使用`mpirun`运行进行分布式训练，如下启动2个节点同时进行训练。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "! /da2/zhangyansheng/openmpi-1.4.5/bin/mpirun -n 2 python wide_deep.py"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 总结"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "上面的例子是一个单机版较为简单的例子，相信大家对TensorNet的使用已经有了一个简单的了解，后面的tutorial中我们会介绍分布式部署及在线预估等方案。"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.6"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
